N-Way merge with remote data

How to merge multiple sorted remote data streams using the heapq.merge function that ships with Python.


In [ ]:
from __future__ import print_function
import heapq

from IPython.display import display
from IPython import parallel

rc = parallel.Client()

Imagine we have some routine that is capable of loading/creating a sorted subset of our data in an engine, based on a parameter (such as the indes of which part of the data to read):


In [ ]:
def load_data(arg):
    """Load a dataset in the global namespace. The dataset *must* be sorted.

    Return the *name* of the variable in which the dataset was loaded."""
    global data
    # Here, real data loading would occur
    s =  4-arg
    step = arg+1
    data = range(s, s+4*step**2, step)
    return 'data'

Exercise

We want a function that takes a given single-engine View and a variable name, and returns a local iterator on the remote object. It should look something like this skeleton function:


In [ ]:
def remote_iterator(view, name):
    """Return an iterator on an object living on a remote engine."""
    # TODO: create an iterator remotely
    while True:
        pass
        # TODO: yield the next item
        # TODO: turn remote StopIteration into local StopIteration

Relevant Aside:

Errors raised on engines will show up in the Client as a RemoteError. This means you have to be a little careful when trying to catch remote errors:


In [ ]:
try:
    rc[-1].execute("foo = barbarbar", block=True)
except NameError:
    print("caught NameError")
except Exception as e:
    print("Oops! Didn't catch %r" % e)
    raise e
print("safe and sound")

A RemoteError has three attributes:

  • err.ename - the class name of the remote error (e.g. NameError, ValueError)
  • err.evalue - the string value of the error message
  • err.traceback - the remote traceback as a list of strings

For simple builtin exceptions, you can re-raise remote errors as the original exception class with a case like the following:


In [ ]:
def assign_foo():
    try:
        rc[-1].execute("foo = barbarbar", block=True)
    except parallel.RemoteError as e:
        if e.ename == 'NameError':
            raise NameError(e.evalue)
        else:
            raise e

By doing this re-cast, any exception handling outside will handle remote exceptions as if they were local.


In [ ]:
try:
    assign_foo()
except NameError:
    print("caught NameError")
except Exception as e:
    print("Oops! Didn't catch %r" % e)
    raise e
print("safe and sound")

Can you fill out this remote_iterator function?

Potentially useful:

  • catching RemoteErrors
  • parallel.Reference
  • yield

In [ ]:
def remote_iterator(view, name):
    """Return an iterator on an object living on a remote engine."""
    # TODO: create an iterator remotely
    while True:
        pass
        # TODO: yield the next item
        # TODO: turn remote StopIteration into local StopIteration

A local example that should be a good guideline for the remote version:


In [ ]:
%load soln/remote_iter_hint.py

And the solution:


In [ ]:
%load soln/remote_iter.py

And an ever-so-slightly fancier solution:


In [ ]:
%load soln/remote_iter_slightly_better.py

Now, we bring IPython.parallel into action:


In [ ]:
dview = rc.direct_view()
print('Engine IDs:', rc.ids)

In [ ]:
# Load the data on the engines
data_refs = dview.map(load_data, rc.ids)

In [ ]:
data_refs

In [ ]:
list(data_refs)

In [ ]:
# And we now make a local object which represents the remote iterator
iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)]
for it in iterators:
    print(list(it))

Now, let's merge those datasets into a single sorted one:


In [ ]:
print('Locally merge the remote sets:')
iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)] 
remote = list(heapq.merge(*iterators))
print(remote)

Validation

repeat the operation by copying the data from the engines to our local namespace and doing a regular merge here:


In [ ]:
# Key step here: pull data from each engine:
local_data = [rc[e][ref] for e,ref in zip(rc.ids, data_refs)]
print('Local data:')
for subset in local_data:
    print(subset)
print('Sorted:')
local = list(heapq.merge(*local_data))
print(local)
print("local == remote: %s" % (local==remote))

In [ ]: